// Copyright 2022 Huawei Cloud Computing Technology Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <mutex>
#include <deque>
#include <condition_variable>
#include <exception>
#include "CasBuffer.h"
#include "CasMsg.h"
#include "CasDataPipe.h"
#include "CasLog.h"

using namespace std;

CasDataPipe::CasDataPipe(bool block)
{
    this->m_block = block;
    this->m_status = true;
    this->m_totalSize = 0;
}

CasDataPipe::~CasDataPipe()
{
    this->m_status = false;
    this->Clear();
}

void CasDataPipe::Clear() noexcept
{
    while (!(this->m_deque.empty())) {
        void *pPkt = this->m_deque.front();
        this->m_deque.pop_front();
        FreeBuffer(pPkt);
    }
}

void CasDataPipe::Exit()
{
    this->m_status = false;
    unique_lock<mutex> lck(this->m_lock);
    this->m_cv.notify_all();
}

void CasDataPipe::Handle(void *pPkt)
{
    try {
        unique_lock<mutex> lck(this->m_lock);
        if (this->m_status) {
            stream_msg_head_t *head = (stream_msg_head_t *)pPkt;
            this->m_totalSize += head->GetPayloadSize();
            this->m_unprocessedSize = m_deque.size() + 1;
            this->m_deque.push_back(pPkt);

            if (this->m_block) {
                this->m_cv.notify_one();
            }
        } else {
            FreeBuffer(pPkt);
        }
    } catch (exception &e) {
        FreeBuffer(pPkt);
        ERR("Deque push failed, exception is %s.", e.what());
    }
}

void *CasDataPipe::GetNextPkt()
{
    void *pPkt = nullptr;
    unique_lock<mutex> lck(this->m_lock);
    if (!this->m_status) {
        return nullptr;
    }

    if (this->m_block) {
        this->m_cv.wait(lck, [this]() -> bool { return ((!this->m_status) || (!(this->m_deque.empty()))); });
    }

    if ((this->m_status) && (!(this->m_deque.empty()))) {
        pPkt = this->m_deque.front();
        this->m_deque.pop_front();
        stream_msg_head_t *head = (stream_msg_head_t *)pPkt;
        this->m_totalSize -= head->GetPayloadSize();
    }
    return pPkt;
}

void *CasDataPipe::GetNextPktWaitFor(int timeout)
{
    void *pPkt = nullptr;

    unique_lock<mutex> lck(this->m_lock);

    if (!this->m_status) {
        return nullptr;
    }

    if (this->m_block) {
        this->m_cv.wait_for(lck, std::chrono::milliseconds(timeout),
                            [this]() -> bool { return ((!this->m_status) || (!(this->m_deque.empty()))); });
    }

    if ((this->m_status) && (!(this->m_deque.empty()))) {
        pPkt = this->m_deque.front();
        this->m_deque.pop_front();
        stream_msg_head_t *head = (stream_msg_head_t *)pPkt;
        this->m_totalSize -= head->GetPayloadSize();
    }
    return pPkt;
}

int CasDataPipe::GetNumItems()
{
    return this->m_deque.size();
}

void CasDataPipe::UpdateUnprocessedSize()
{
    this->m_unprocessedSize = GetNumItems();
}